﻿
using EmperialApps.WeatherSpark.Data;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Timers;

namespace EmperialApps.WeatherSpark.Service.Internal {

    using MessagePair = Pair<Coordinate, CloudQueueMessage>;


    /// <summary>Acts as a location scheduler for forecast locations, backed by an Azure message queue.</summary>
    internal sealed class LocationQueueScheduler : ILocationScheduler {

        /// <summary>Initializes a new instance of the <see cref="LocationQueueScheduler"/> class.</summary>
        public LocationQueueScheduler( CloudStorageAccount account, ForecastInfoTableStore forecastInfo, bool refresh ) {
            this._forecastInfo = forecastInfo;

            // Retrieve location message queue.
            var client = account.CreateCloudQueueClient( );
            this._locationQueue = client.GetQueueReference( LocationMessageQueue );
            this._locationQueue.CreateIfNotExists( );

            // Start refresh timer and update place corrections, if requested.
            if( refresh ) {
                this._refreshTimer = new Timer { Interval = 2 * UpdateLimit.TotalMilliseconds };
                this._refreshTimer.Elapsed += this.OnRefreshTimerTick;
                this._refreshTimer.Enabled = true;

                TraceEventType.Verbose.Trace( "Updating place corrections..." );
                try {
                    string corrections = default( Place.Corrections ).Save( );
                    byte[] bytes = System.Text.Encoding.UTF8.GetBytes( corrections );

                    var rootClient = account.CreateCloudBlobClient( );
                    var root = rootClient.GetRootContainerReference( );
                    root.CreateIfNotExists( );

                    var blob = root.GetBlockBlobReference( "corrections" );
                    using( var stream = blob.OpenWrite( ) )
                        stream.Write( bytes, 0, bytes.Length );
                }
                catch( Exception ex ) {
                    ex.Trace( "Could not update place corrections" );
                }
            }
            else {
                this._refreshTimer = null;
            }
        }


        /// <inheritdoc/>
        public void AddLocation( Coordinate location ) {
            TraceEventType.Verbose.Trace( "Adding download message for " + location );
            var message = new CloudQueueMessage( location.GetName( ) );
            this._locationQueue.AddMessage( message );
        }

        /// <inheritdoc/>
        public bool TryGetNext( out Coordinate location ) {
            var message = this._locationQueue.GetMessage( ProcessLimit );

            bool process = this.ShouldProcessMessage( message, out location );
            if( process ) {
                TraceEventType.Information.Trace( "Processing download message {0} for {1}", message.Id, location );

                MessagePair pair = new MessagePair( location, message );
                int index = this._unprocessedMessages.BinarySearch( pair, MessagePairComparer.Instance );
                this._unprocessedMessages.Insert( index < 0 ? ~index : 1 + index, pair );
            }

            return process;
        }

        /// <inheritdoc/>
        public void Schedule( Coordinate location, Coordinate newLocation ) {
            CloudQueueMessage message;
            if( !this.GetUnprocessedMessage( location, out message ) ) {
                TraceEventType.Verbose.Trace( "Could not retrieve scheduling message for " + location );
                return;
            }

            var updates = MessageUpdateFields.Visibility;
            if( location != newLocation ) {
                TraceEventType.Information.Trace( "Retargeting download message {0} from {1} to {2}", message.Id, location, newLocation );
                message.SetMessageContent( newLocation.GetName( ) );
                updates |= MessageUpdateFields.Content;
            }

            TraceEventType.Verbose.Trace( "Scheduling download message {0} for {1}", message.Id, newLocation );
            try {
                this._locationQueue.UpdateMessage( message, ScheduleDelay, updates );
            }
            catch( StorageException ex ) {
                if( ex.Message.Contains( "(404)" ) )
                    TraceEventType.Verbose.Trace( "Could not reschedule message " + message.Id + " for " + location );
                else
                    throw;
            }
        }

        /// <inheritdoc/>
        public void Remove( Coordinate location ) {
            CloudQueueMessage message;
            if( this.GetUnprocessedMessage( location, out message ) )
                this.RemoveMessage( message, trace: true );
        }


        #region Private Members

        private const string LocationMessageQueue = "locations";
        private static readonly TimeSpan ProcessLimit = TimeSpan.FromMinutes( 1.2 );
#if DEBUG
        private static readonly TimeSpan ScheduleDelay = TimeSpan.FromHours( 0.01 );
#else
        private static readonly TimeSpan ScheduleDelay = TimeSpan.FromHours( 1.0 );
#endif
        private static readonly TimeSpan UpdateLimit = TimeSpan.FromHours( 0.95 * ScheduleDelay.TotalHours );
        private static readonly TimeSpan RefreshLimit = TimeSpan.FromHours( 1.2 * ScheduleDelay.TotalHours );

        private readonly List<MessagePair> _unprocessedMessages = new List<MessagePair>( );
        private readonly ForecastInfoTableStore _forecastInfo;
        private readonly CloudQueue _locationQueue;
        private readonly Timer _refreshTimer;


        private void OnRefreshTimerTick( object sender, EventArgs e ) {
            DateTimeOffset minimumUpdate = DateTimeOffset.Now - RefreshLimit;
            TraceEventType.Verbose.Trace( "Refreshing download messages for forecasts not downloaded since {0:u}", minimumUpdate );

            // Queue new messages for any out of date forecasts.
            int refreshCount = 0;
            foreach( ForecastInfo info in this._forecastInfo.Get( ) ) {
                DateTimeOffset lastUpdate = info.GetLastProcessed( );
                if( info.HasForecast && lastUpdate < minimumUpdate ) {
                    this.AddLocation( info.GetLocation( ) );
                    ++refreshCount;
                }
            }

            TraceEventType.Information.Trace( "Refreshing download messages created for {0} forecasts", refreshCount );
        }

        private bool GetUnprocessedMessage( Coordinate location, out CloudQueueMessage message ) {
            int index = this._unprocessedMessages.BinarySearch( new MessagePair( location, null ), MessagePairComparer.Instance );
            if( index < 0 ) {
                message = null;
                return false;
            }

            MessagePair pair = this._unprocessedMessages[index];
            this._unprocessedMessages.RemoveAt( index );
            message = pair.Item2;
            return message != null;
        }

        private void RemoveMessage( CloudQueueMessage message, bool trace ) {
            if( message == null )
                return;

            if( trace )
                TraceEventType.Information.Trace( "Removing download message: " + message.AsString );
            try {
                this._locationQueue.DeleteMessage( message );
            }
            catch( StorageException ex ) {
                if( ex.Message.Contains( "(404)" ) )
                    TraceEventType.Verbose.Trace( "Download message already removed: " + message.AsString );
                else
                    ex.Trace( "Error removing download message: " + message.AsString );
            }
        }

        private bool ShouldProcessMessage( CloudQueueMessage message, out Coordinate location ) {
            if( message == null ) {
                location = default( Coordinate );
                return false;
            }

            location = Coordinate.Parse( message.AsString );
            var info = this._forecastInfo.Get( location );

            // If this is a new downloaded request, process the message.
            bool process;
            if( info == null || info.ForecastUrl == null ) {
                TraceEventType.Information.Trace( "New download requested: " + location );
                process = true;
            }
            // Otherwise, check whether it has been processed recently.
            else {
                DateTimeOffset now = DateTimeOffset.Now;
                DateTimeOffset scheduleLimit = now - UpdateLimit;
                DateTimeOffset lastProcessed = info.GetLastProcessed( );

                process = lastProcessed <= scheduleLimit;

                // If it has been long enough since the last download, process the message.
                if( process ) {
                    info.SetLastProcessed( now );
                    this._forecastInfo.Update( info );
                }
                // Otherwise, remove redundant message.
                else {
                    TraceEventType.Verbose.Trace( "Removing redundant download message {0} for {1}", message.Id, location );
                    this.RemoveMessage( message, trace: false );
                }
            }

            return process;
        }


        private sealed class MessagePairComparer : IComparer<MessagePair> {
            public static readonly MessagePairComparer Instance = new MessagePairComparer( );

            private MessagePairComparer( ) { }

            public int Compare( MessagePair x, MessagePair y ) {
                return x.Item1.CompareTo( y.Item1 );
            }
        }

        #endregion
    }

}
